package com.offcn.bigdata.spark.p3

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark 广播变量学习
  *    用广播变量的方式改写 transformation中的join操作
  *    其实就是map join（大小表关联）
  *    join操作其实就只reduce join
  *
  * 不适合广播大变量
  * 不适合频繁更新的变量
  */
object _03BroadcastOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_03BroadcastOps.getClass.getSimpleName}")
            .setMaster("local[*]")

        val sc = new SparkContext(conf)


//        joinOps(sc)
        broadcastOps(sc)
        sc.stop()
    }

    def broadcastOps(sc: SparkContext): Unit = {
        case class Student(id: Int, name: String, age: Int)
        case class Score(sid: Int, course: String, score: Float)

        //学生列表-----list
        val stus:Map[Int, Student] = List(
            Student(1, "刘博", 15),
            Student(2, "霍龙飞", 16),
            Student(3, "付云瑾", 17),
            Student(4, "何浩", 18),
            Student(10086, "刘武", 28)
        ).map(stu => (stu.id, stu)).toMap

        //学生考试成绩列表---rdd
        val scoreRDD:RDD[Score] = sc.parallelize(List(
            Score(1, "语文", 70.5f),
            Score(2, "数学", 80.5f),
            Score(3, "英语", 30.5f),
            Score(4, "体育", 99f),
            Score(10010, "语文", 70.5f)
        ))

        //将普通变量转化为广播变量
        val stuMapBC:Broadcast[Map[Int, Student]] = sc.broadcast(stus)
        scoreRDD.map(score => {
            //学生信息怎么获取？
            val stuMap = stuMapBC.value
            val stu = stuMap.getOrElse(score.sid, "UnKnow")
            (score, stu)
        }).foreach(println)
    }

    def joinOps(sc: SparkContext): Unit = {
        case class Student(id: Int, name: String, age: Int)
        case class Score(sid: Int, course: String, score: Float)

        val stuRDD: RDD[Student] = sc.parallelize(List(
            Student(1, "刘博", 15),
            Student(2, "霍龙飞", 16),
            Student(3, "付云瑾", 17),
            Student(4, "何浩", 18),
            Student(10086, "刘武", 28)
        ))
        val scoreRDD:RDD[Score] = sc.parallelize(List(
            Score(1, "语文", 70.5f),
            Score(2, "数学", 80.5f),
            Score(3, "英语", 30.5f),
            Score(4, "体育", 99f),
            Score(10010, "语文", 70.5f)
        ))

        val id2Stu:RDD[(Int, Student)] = stuRDD.map(stu => (stu.id, stu))
        val id2Score:RDD[(Int, Score)] = scoreRDD.map(score => (score.sid, score))

        val info:RDD[(Int, (Student, Score))] = id2Stu.join(id2Score)
        info.foreach{case (id, (stu, score)) => {
            println(s"id: ${id}, student: ${stu}, score: ${score}")
        }}
    }
}
